%%-*- mode: erlang -*-

%% @doc enable active anti-entropy subsystem
{mapping, "anti_entropy", "riak_core.anti_entropy", [
  {datatype, {enum, [on, off, debug]}},
  {default, on}
]}.

{ translation,
  "riak_core.anti_entropy",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("anti_entropy", Conf),
    case Setting of
      on -> {on, []};
      debug -> {on, [debug]};
      off -> {off, []};
      _Default -> {on, []}
    end
  end
}.

%% @doc Restrict how fast AAE can build hash trees. Building the tree
%% for a given partition requires a full scan over that partition's
%% data. Once built, trees stay built until they are expired.
%% Config is of the form:
%%   {num-builds, per-timespan}
%% Default is 1 build per hour.
{mapping, "anti_entropy.build_limit.number", "riak_core.anti_entropy_build_limit", [
  {default, 1},
  {datatype, integer}
]}.

{mapping, "anti_entropy.build_limit.per_timespan", "riak_core.anti_entropy_build_limit", [
  {default, "1h"},
  {datatype, {duration, ms}}
]}.

{translation,
 "riak_core.anti_entropy_build_limit",
 fun(Conf) ->
    {cuttlefish:conf_get("anti_entropy.build_limit.number", Conf),
     cuttlefish:conf_get("anti_entropy.build_limit.per_timespan", Conf)}
 end}.

%% @doc Determine how often hash trees are expired after being built.
%% Periodically expiring a hash tree ensures the on-disk hash tree
%% data stays consistent with the actual k/v backend data. It also
%% helps Riak identify silent disk failures and bit rot. However,
%% expiration is not needed for normal AAE operation and should be
%% infrequent for performance reasons. The time is specified in
%% milliseconds. The default is 1 week.
{mapping, "anti_entropy.expire", "riak_core.anti_entropy_expire", [
  {default, "1w"},
  {datatype, {duration, ms}}
]}.

%% @doc Limit how many AAE exchanges/builds can happen concurrently.
{mapping, "anti_entropy.concurrency", "riak_core.anti_entropy_concurrency", [
  {default, 2},
  {datatype, integer}
]}.

%% @doc The tick determines how often the AAE manager looks for work
%% to do (building/expiring trees, triggering exchanges, etc).
%% The default is every 15 seconds. Lowering this value will
%% speedup the rate that all replicas are synced across the cluster.
%% Increasing the value is not recommended.
{mapping, "anti_entropy.tick", "riak_core.anti_entropy_tick", [
  {default, "15s"},
  {datatype, {duration, ms}}
]}.

%% @doc The directory where AAE hash trees are stored.
{mapping, "anti_entropy.data_dir", "riak_core.anti_entropy_data_dir", [
  {default, "{{platform_data_dir}}/anti_entropy"}
]}.


%% @doc This parameter defines the percentage, 1 to 100, of total
%% server memory to assign to leveldb.  leveldb will dynamically
%% adjust it internal cache sizs as Riak activates / inactivates
%% vnodes on this server to stay within this size.  The memory size
%% can alternately be assigned as a byte count via total_leveldb_mem instead.
{mapping, "anti_entropy.total_leveldb_mem_percent", "riak_core.aae_total_leveldb_mem_percent",
 [{default, "80"},
  {datatype, integer}]}.


%% @doc This parameter defines the number of bytes of
%% server memory to assign to leveldb.  leveldb will dynamically
%% adjust it internal cache sizes as Riak activates / inactivates
%% vnodes on this server to stay within this size.  The memory size
%% can alternately be assigned as percentage of total server memory
%% via total_leveldb_mem_percent instead.
{mapping, "anti_entropy.total_leveldb_mem", "riak_core.aae_total_leveldb_mem",
 [{datatype, bytesize},
  {level, advanced}]}.


%% @doc The 'sync' parameter defines how new key/value data is placed in the
%% recovery log. The recovery log is only used if the Riak program crashes or
%% the server loses power unexpectedly. The parameter's original intent was
%% to guarantee that each new key / value was written to the physical disk
%% before leveldb responded with “write good”. The reality in modern servers
%% is that many layers of data caching exist between the database program and
%% the physical disks. This flag influences only one of the layers.
{mapping, "anti_entropy.sync", "riak_core.aae_sync",
 [{default, false},
  {datatype, {enum, [true, false]}},
  {level, advanced}]}.

%% @doc limited_developer_mem is a Riak specific option that is used when
%% a developer is testing a high number of vnodes and/or several VMs
%% on a machine with limited physical memory.  Do NOT use this option
%% if making performance measurements.  This option overwrites values
%% given to write_buffer_size_min and write_buffer_size_max.
{mapping, "anti_entropy.limited_developer_mem", "riak_core.aae_limited_developer_mem",
 [{default, false},
  {datatype, {enum, [true, false]}},
  {level, advanced}]}.


%% @doc Each vnode first stores new key/value data in a memory based write
%% buffer. This write buffer is in parallel to the recovery log mentioned
%% in the “sync” parameter. Riak creates each vnode with a randomly sized
%% write buffer for performance reasons. The random size is somewhere
%% between write_buffer_size_min and write_buffer_size_max.
{mapping, "anti_entropy.write_buffer_size_min", "riak_core.aae_write_buffer_size_min",
 [{default, "30MB"},
  {datatype, bytesize},
  {level, advanced}]}.

{mapping, "anti_entropy.write_buffer_size_max", "riak_core.aae_write_buffer_size_max",
 [{default, "60MB"},
  {datatype, bytesize},
  {level, advanced}]}.

%% @doc Whether the distributed throttle for active anti-entropy is
%% enabled.
{mapping, "anti_entropy.throttle", "riak_core.aae_throttle_kill_switch", [
  {default, on},
  {datatype, {flag, off, on}},
  hidden
]}.

%% @doc Sets the throttling tiers for active anti-entropy. Each tier
%% is a minimum vnode mailbox size and a time-delay that the throttle
%% should observe at that size and above. For example:
%%
%%     anti_entropy.throttle.tier1.mailbox_size = 0
%%     anti_entropy.throttle.tier1.delay = 0ms
%%     anti_entropy.throttle.tier2.mailbox_size = 40
%%     anti_entropy.throttle.tier2.delay = 5ms
%%
%% If configured, there must be a tier which includes a mailbox size
%% of 0. Both .mailbox_size and .delay must be set for each tier.
%% @see anti_entropy.throttle
{mapping,
 "anti_entropy.throttle.$tier.mailbox_size",
 "riak_core.aae_throttle_limits", [
  {datatype, integer},
  hidden,
  {validators, ["non_negative"]}
]}.

%% @see anti_entropy.throttle.$tier.mailbox_size
{mapping,
 "anti_entropy.throttle.$tier.delay",
 "riak_core.aae_throttle_limits", [
  {datatype, {duration, ms}},
  hidden
]}.

{validator,
 "non_negative",
 "must be greater than or equal to 0",
 fun(Value) -> Value >= 0 end}.

{translation,
 "riak_core.aae_throttle_limits",
 fun(Conf) ->
   %% Grab all of the possible names of tiers so we can ensure that
   %% both mailbox_size and delay are included for each tier.
   TierNamesM = cuttlefish_variable:fuzzy_matches(["anti_entropy", "throttle", "$tier", "mailbox_size"], Conf),
   TierNamesD = cuttlefish_variable:fuzzy_matches(["anti_entropy", "throttle", "$tier", "delay"], Conf),
   TierNames = lists:usort(TierNamesM ++ TierNamesD),
   Throttles = lists:sort(lists:foldl(
     fun({"$tier", Tier}, Settings) ->
         Mbox = cuttlefish:conf_get(["anti_entropy", "throttle", Tier, "mailbox_size"], Conf),
         Delay = cuttlefish:conf_get(["anti_entropy", "throttle", Tier, "delay"], Conf),
         [{Mbox - 1, Delay}|Settings]
     end, [], TierNames)),
   case Throttles of
       %% -1 is a magic "minimum" bound and must be included, so if it
       %% isn't present we call it invalid
       [{-1,_}|_] -> Throttles;
       _ -> cuttlefish:invalid("anti_entropy.throttle tiers must include a tier with mailbox_size 0")
   end
end
}.


%% @doc Each database .sst table file can include an optional "bloom filter"
%% that is highly effective in shortcutting data queries that are destined
%% to not find the requested key. The bloom_filter typically increases the
%% size of an .sst table file by about 2%. This option must be set to true
%% in the riak.conf to take effect.
{mapping, "anti_entropy.bloomfilter", "riak_core.aae_use_bloomfilter",
 [{default, on},
  {datatype, {enum, [on, off]}}]}.

{translation,
 "riak_core.aae_use_bloomfilter",
 fun(Conf) ->
         case cuttlefish:conf_get("anti_entropy.bloomfilter", Conf) of
             on -> true;
             off -> false;
             _ -> true
         end
 end
}.


%% @doc sst_block_size defines the size threshold for a block / chunk of data
%% within one .sst table file. Each new block gets an index entry in the .sst
%% table file's master index.
{mapping, "anti_entropy.block_size", "riak_core.aae_sst_block_size",
[{default, "4KB"},
 {datatype, bytesize},
 {level, advanced}]}.


%% @doc block_restart_interval defines the key count threshold for a new key
%% entry in the key index for a block.
%% Most clients should leave this parameter alone.
{mapping, "anti_entropy.block_restart_interval", "riak_core.aae_block_restart_interval",
 [{default, 16},
  {datatype, integer},
  {level, advanced}]}.


%% @doc verify_checksums controls whether or not validation occurs when Riak
%% requests data from the leveldb database on behalf of the user.
{mapping, "anti_entropy.verify_checksums", "riak_core.aae_verify_checksums",
 [{default, true},
  {datatype, {enum, [true, false]}},
  {level, advanced}]}.


%% @doc verify_compaction controls whether or not validation occurs when
%% leveldb reads data as part of its background compaction operations.
{mapping, "anti_entropy.verify_compaction", "riak_core.aae_verify_compaction",
 [{default, true},
  {datatype, {enum, [true, false]}},
  {level, advanced}]}.

%% @doc The number of worker threads performing LevelDB operations.
{mapping, "anti_entropy.threads", "riak_core.aae_eleveldb_threads",
 [{default, 71},
  {datatype, integer},
  {level, advanced}]}.

%% @doc Option to override LevelDB's use of fadvise(DONTNEED) with
%% fadvise(WILLNEED) instead.  WILLNEED can reduce disk activity on
%% systems where physical memory exceeds the database size.
{mapping, "anti_entropy.fadvise_willneed", "riak_core.aae_fadvise_willneed",
 [{default, false},
  {datatype, {enum, [true, false]}},
  {level, advanced}]}.

%% Default Bucket Properties

%% @doc The number of replicas stored. Note: See Replication
%% Properties for further discussion.
%% http://docs.basho.com/riak/latest/dev/advanced/cap-controls/
{mapping, "buckets.default.n_val", "riak_core.default_bucket_props.n_val", [
  {datatype, integer},
  {default, 3},
  hidden
]}.

%% @doc Number of partitions in the cluster (only valid when first
%% creating the cluster). Must be a power of 2, minimum 8 and maximum
%% 1024.
{mapping, "ring_size", "riak_core.ring_creation_size", [
  {datatype, integer},
  {default, 64},
  {validators, ["ring_size^2", "ring_size_max", "ring_size_min"]},
  {commented, 64}
]}.

%% ring_size validators
{validator, "ring_size_max",
 "2048 and larger are supported, but considered advanced config",
 fun(Size) ->
  Size =< 1024
 end}.

{mapping, "buckets.default.pr", "riak_core.default_bucket_props.pr", [
  {default, "0"},
  {level, advanced}
]}.

%% Cut and paste translation screams to be rewritten as a datatype, but that's a
%% "nice to have"
{translation,
  "riak_core.default_bucket_props.pr",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("buckets.default.pr", Conf),
    case Setting of
      "quorum" -> quorum;
      "all" -> all;
      X ->
        try list_to_integer(Setting) of
          Int -> Int
        catch
          E:R -> error
        end
    end
  end
}.

{mapping, "buckets.default.r", "riak_core.default_bucket_props.r", [
  {default, "quorum"},
  {level, advanced}
]}.
{translation,
  "riak_core.default_bucket_props.r",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("buckets.default.r", Conf),
    case Setting of
      "quorum" -> quorum;
      "all" -> all;
      X ->
        try list_to_integer(Setting) of
          Int -> Int
        catch
          E:R -> error
        end
    end
  end
}.

{mapping, "buckets.default.w", "riak_core.default_bucket_props.w", [
  {default, "quorum"},
  {level, advanced}
]}.
{translation,
  "riak_core.default_bucket_props.w",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("buckets.default.w", Conf),
    case Setting of
      "quorum" -> quorum;
      "all" -> all;
      X ->
        try list_to_integer(Setting) of
          Int -> Int
        catch
          E:R -> error
        end
    end
  end
}.

{mapping, "buckets.default.pw", "riak_core.default_bucket_props.pw", [
  {default, "0"},
  {level, advanced}
]}.
{translation,
  "riak_core.default_bucket_props.pw",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("buckets.default.pw", Conf),
    case Setting of
      "quorum" -> quorum;
      "all" -> all;
      X ->
        try list_to_integer(Setting) of
          Int -> Int
        catch
          E:R -> error
        end
    end
  end
}.

{mapping, "buckets.default.dw", "riak_core.default_bucket_props.dw", [
  {default, "quorum"},
  {level, advanced}
]}.
{translation,
  "riak_core.default_bucket_props.dw",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("buckets.default.dw", Conf),
    case Setting of
      "quorum" -> quorum;
      "all" -> all;
      X ->
        try list_to_integer(Setting) of
          Int -> Int
        catch
          E:R -> error
        end
    end
  end
}.

{mapping, "buckets.default.rw", "riak_core.default_bucket_props.rw", [
  {default, "quorum"},
  {level, advanced}
]}.
{translation,
  "riak_core.default_bucket_props.rw",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("buckets.default.rw", Conf),
    case Setting of
      "quorum" -> quorum;
      "all" -> all;
      X ->
        try list_to_integer(Setting) of
          Int -> Int
        catch
          E:R -> error
        end
    end
  end
}.

%% {mapping, "buckets.default.basic_quorum", "riak_core.default_bucket_props.basic_quorum", false},
%% {mapping, "buckets.default.notfound_ok", "riak_core.default_bucket_props.notfound_ok", true}

%% @doc whether or not siblings are allowed.
%% Note: See Vector Clocks for a discussion of sibling resolution.
{mapping, "buckets.default.siblings", "riak_core.default_bucket_props.allow_mult", [
  {datatype, {enum, [on, off]}},
  {default, on},
  {level, advanced}
]}.

{translation,
 "riak_core.default_bucket_props.allow_mult",
 fun(Conf) ->
  Setting = cuttlefish:conf_get("buckets.default.siblings", Conf),
    case Setting of
      on -> true;
      off -> false;
      _Default -> true
    end
 end}.

{validator, "ring_size^2", "not a power of 2",
 fun(Size) ->
  (Size band (Size-1) =:= 0)
 end}.

{validator, "ring_size_min", "must be at least 8",
 fun(Size) ->
  Size >= 8
 end}.

%% @doc Number of concurrent node-to-node transfers allowed.
{mapping, "transfer_limit", "riak_core.handoff_concurrency", [
  {datatype, integer},
  {default, 2},
  {commented, 2}
]}.

%% @doc Default location of ringstate
{mapping, "ring.state_dir", "riak_core.ring_state_dir", [
  {datatype, directory},
  {default, "$(platform_data_dir)/ring"},
  hidden
]}.

%% @doc Default cert location for https can be overridden
%% with the ssl config variable, for example:
{mapping, "ssl.certfile", "riak_core.ssl.certfile", [
  {datatype, file},
  {commented, "$(platform_etc_dir)/cert.pem"}
]}.

%% @doc Default key location for https can be overridden with the ssl
%% config variable, for example:
{mapping, "ssl.keyfile", "riak_core.ssl.keyfile", [
  {datatype, file},
  {commented, "$(platform_etc_dir)/key.pem"}
]}.

%% @doc Default signing authority location for https can be overridden
%% with the ssl config variable, for example:
{mapping, "ssl.cacertfile", "riak_core.ssl.cacertfile", [
  {datatype, file},
  {commented, "$(platform_etc_dir)/cacertfile.pem"}
]}.

%% @doc handoff.ip is the network address that Riak binds to for
%% intra-cluster data handoff.
{mapping, "handoff.ip", "riak_core.handoff_ip", [
  {default, "{{handoff_ip}}" },
  {datatype, string},
  % XXX: Commented not_localhost to allow local development
  % uncomment for production
  % {validators, ["valid_ipaddr", "not_localhost"]},
  {validators, ["valid_ipaddr"]},
  hidden
]}.

{validator,
  "valid_ipaddr",
  "must be a valid IP address",
  fun(AddrString) ->
    case inet_parse:address(AddrString) of
      {ok, _} -> true;
      {error, _} -> false
    end
  end}.

{validator,
  "not_localhost",
  "can't be a local ip",
  fun(AddrString) ->
    case inet_parse:address(AddrString) of
      {ok, {127, 0, _, _}} -> false;
      {ok, _} -> true;
      {error, _} -> false
    end
  end}.

%% @doc handoff.port is the TCP port that Riak uses for
%% intra-cluster data handoff.
{mapping, "handoff.port", "riak_core.handoff_port", [
  {default, {{handoff_port}} },
  {datatype, integer},
  hidden
]}.

%% @doc To encrypt riak_core intra-cluster data handoff traffic,
%% uncomment the following line and edit its path to an appropriate
%% certfile and keyfile.  (This example uses a single file with both
%% items concatenated together.)
{mapping, "handoff.ssl.certfile", "riak_core.handoff_ssl_options.certfile", [
%%  {commented, "/tmp/erlserver.pem"},
  {datatype, file},
  hidden
]}.

%% @doc if you need a seperate keyfile for handoff
{mapping, "handoff.ssl.keyfile", "riak_core.handoff_ssl_options.keyfile", [
  {datatype, file},
  hidden
]}.

%% @doc Enables/disables outbound handoff transfers for this node. If you
%% turn this setting off at runtime with riak-admin, it will kill any
%% outbound handoffs currently running.
{mapping, "handoff.outbound", "riak_core.disable_outbound_handoff", [
  {default, on},
  {datatype, {flag, off, on}},
  hidden
]}.

%% @doc Enables/disables inbound handoff transfers for this node. If you
%% turn this setting off at runtime with riak-admin, it will kill any
%% inbound handoffs currently running.
{mapping, "handoff.inbound", "riak_core.disable_inbound_handoff", [
  {default, on},
  {datatype, {flag, off, on}},
  hidden
]}.

%% @doc The time a vnode has to be idle for a handoff to occour. (I think)
{mapping, "handoff.inactivity_timeout", "riak_core.vnode_inactivity_timeout", [
  {default, "1m"},
 {datatype, {duration, ms}}
]}.

%% @doc DTrace support Do not enable 'dtrace' unless your Erlang/OTP
%% runtime is compiled to support DTrace.  DTrace is available in
%% R15B01 (supported by the Erlang/OTP official source package) and in
%% R14B04 via a custom source repository & branch.
{mapping, "dtrace", "riak_core.dtrace_support", [
  {default, off},
  {datatype, flag}
]}.

%% consistent on/off (in lieu of enabled/disabled, true/false)
{ translation,
  "riak_core.dtrace_support",
  fun(Conf) ->
    Setting = cuttlefish:conf_get("dtrace", Conf),
    case Setting of
      on -> true;
      off -> false;
      _Default -> false
    end
  end
}.

%% @doc Platform-specific installation paths (substituted by rebar)
{mapping, "platform_bin_dir", "riak_core.platform_bin_dir", [
  {datatype, directory},
  {default, "{{platform_bin_dir}}"}
]}.

%% @see platform_bin_dir
{mapping, "platform_data_dir", "riak_core.platform_data_dir", [
  {datatype, directory},
  {default, "{{platform_data_dir}}"}
]}.

%% @see platform_bin_dir
{mapping, "platform_etc_dir", "riak_core.platform_etc_dir", [
  {datatype, directory},
  {default, "{{platform_etc_dir}}"}
]}.

%% @see platform_bin_dir
{mapping, "platform_lib_dir", "riak_core.platform_lib_dir", [
  {datatype, directory},
  {default, "{{platform_lib_dir}}"}
]}.

%% @see platform_bin_dir
{mapping, "platform_log_dir", "riak_core.platform_log_dir", [
  {datatype, directory},
  {default, "{{platform_log_dir}}"}
]}.

%% @doc Enable consensus subsystem. Set to 'on' to enable the
%% consensus subsystem used for strongly consistent Riak operations.
{mapping, "strong_consistency", "riak_core.enable_consensus", [
  {datatype, flag},
  {default, off},
  {commented, on}
]}.

%% @doc Whether to enable the background manager globally. When
%% enabled, participating Riak subsystems will coordinate access to
%% shared resources. This will help to prevent system response
%% degradation under times of heavy load from multiple background
%% tasks. Specific subsystems may also have their own controls over
%% use of the background manager.
{mapping, "background_manager", "riak_core.use_background_manager", [
    {datatype, flag},
    {default, off},
    hidden
]}.

%% @doc Interval of time between vnode management
%% activities. Modifying this will change the amount of time between
%% attemps to trigger handoff between this node and any other member
%% of the cluster.
{mapping, "vnode_management_timer", "riak_core.vnode_management_timer", [
    {default, "10s"},
    {datatype, {duration, ms}},
    hidden
]}.

%% @doc Home directory for the run user
{mapping, "run_user_home", "setup.home",
  [{default, "{{run_user_home}}"},
   hidden,
   {datatype, string}]}.

%% Async Job Management
%%
%% This is a translation for mappings that appear in other schema files.
%% Mappings are from "cluster.job.$namespace.$operation"* to
%% "riak_core.job_accept_class" with required attributes
%%  [merge, {datatype, {flag, enabled, disabled}}].**
%% *  Mappings are only performed on elements with exactly the number of
%%    segments shown - any other number of elements, even with a matching
%%    prefix, is ignored.
%% ** The 'datatype' should be 'flag', and 'enabled'/'disabled' are our
%%    conventions, but any OnFlag/OffFlag pair can be used as long as they map
%%    to boolean values.
%% Other attributes, such as 'hidden' or {default, X} are fine, since they
%% don't make it down the stack to here.
%% Job classes that should be enabled by default MUST have a {default, enabled}
%% attribute, as the runtime filter only defaults to accept when no values have
%% been set from ANY schema file.
%%
%% Example:
%%  {mapping, "cluster.job.harry.fold", "riak_core.job_accept_class", [
%%      merge,
%%      {datatype, {flag, enabled, disabled}},
%%      {default, enabled}
%%  ]}.
%%  {mapping, "cluster.job.alice.list", "riak_core.job_accept_class", [
%%      merge,
%%      {datatype, {flag, enabled, disabled}},
%%      {default, disabled}
%%  ]}.
%% Results in:
%%  {riak_core, [
%%      ...
%%      {job_accept_class, [{harry, fold}]}
%%      ...
%%  ]}.
%%
{translation,
 "riak_core.job_accept_class",
 fun(Conf) ->
    Fold =
     fun({[_, _, Mod, Op], true}, Result) ->
            [{erlang:list_to_atom(Mod), erlang:list_to_atom(Op)} | Result];
        ({[_, _, _, _], false}, Result) ->
            Result;
        ({[_, _, _, _], _} = Setting, _) ->
            cuttlefish:invalid(io_lib:format("~p", [Setting]));
        (_, Result) ->
            Result
    end,
    lists:sort(lists:foldl(Fold, [],
        cuttlefish_variable:filter_by_prefix(["cluster", "job"], Conf)))
 end}.


%% @doc Some requests to the vnodes are handled by an asyncronous worker pool.
%% This parameter allows for tuning this pools behaviour when it comes dealing
%% with requests that are queued.
%% The default (fifo) will serve requests in the order they arrive at the worker
%% pool. The alternative is to serve the requests in the reverse order, dealing
%% with the most recent request first.
%% There are pro's and con's for both aproaches, it is best to test out what
%% works best for the desired characteristics.
%%
%% As a very rought rule of thumb:
%%  - fifo will lead to lower extremes
%%  - filo will lead to lower medians/mediums
{mapping, "worker.queue_strategy", "riak_core.queue_worker_strategy",
 [{default, fifo},
  {datatype, {enum, [fifo, filo]}}]}.
